Synthetic pulsar timeseries

In this notebok, I'll generate a time series for testing the Cobalt beamformed pipeline. The output will be an ascii file containing the data for a single sub band, one sample per line, with four integers representing thre real and imaginary parts of the x signal, and the real and imaginary parts of the y signal, respectively.


In [1]:
%pylab inline

def float_to_int(time_series_x, time_series_y):
    scale = 127/max(abs(time_series_x).max(), abs(time_series_y).max())
    x_re = rint(scale*time_series_x.real)
    x_im = rint(scale*time_series_x.imag)
    y_re = rint(scale*time_series_y.real)
    y_im = rint(scale*time_series_y.imag)
    return x_re, x_im, y_re, y_im

def write_timeseries(time_series_x, time_series_y, file_name):
    x_re, x_im, y_re, y_im = float_to_int(time_series_x, time_series_y)
    with open(file_name, mode='w') as output:
        for xre, xim, yre, yim in zip(x_re, x_im, y_re, y_im):
            output.write('%4d %4d %4d %4d\n' %
                             (int(xre), int(xim), int(yre), int(yim)))


Populating the interactive namespace from numpy and matplotlib

We generate the timeseries by beginning with random Gaussian noise for the real and imaginary parts, to obtain a flat spectral response. The resulting timeseries is subsequently multiplied with the amplitude of the pulsar signal as a function of time.


In [2]:
def complex_noise(num_samples, sigma=1.0):
    r'''
    Return numpy.ndarray of complex valued random noise with
    sigma(real) == sigma(imag) == sigma.
    
    **Parameters**
    
    num_samples : integer
        Total number of complex samples to return.
        
    sigma : float
        The standard deviation of the real and imaginary parts
        
    >>> len(complex_noise(10))
    10
    '''
    return random.normal(loc=0.0, scale=sigma, size=num_samples) + 1.0j*random.normal(loc=0.0, scale=sigma, size=num_samples)

We also need to define the parameters of the simulation, and generate the white noise timeseries:


In [3]:
time_resolution_s = 512./100e6
duration_s = 20.0
num_samples = int(ceil(duration_s/time_resolution_s))
time = arange(num_samples)*time_resolution_s
white_noise_x = complex_noise(num_samples)
white_noise_y = complex_noise(num_samples)

In [4]:
write_timeseries(white_noise_x, white_noise_y, 'synthetic-white-noise-1SB.txt')

In [5]:
figure(figsize=(16,4.5))
x_re, x_im, y_re, y_im = float_to_int(white_noise_x, white_noise_y)
wn_spec = fft.fft(x_re**2 +x_im**2 + y_re**2 +y_im**2)
wn_spec[0] = 0
plot(abs(wn_spec[0:2000]))


Out[5]:
[<matplotlib.lines.Line2D at 0x40435f50>]

We now define the pulsar as a python function that provides the amplitude as a function of time.


In [6]:
def pulsar_amplitude(time_s, period_s, fraction_on):
    r'''
    Compute the amplitude of the "pulsar" signal as a function of time.
    
    **Parameters**
    
    time_s : array of float
        The time in seconds at which to evaluate the amplitude.
        
    period_s : float
        The period of the block wave in s.
    
    fraction_on: float
        Fraction of the period that the pulse must be "on". It will have
        zero amplitude in the rest of the period.
        
    **Example**
    
    >>> sorry, nothing yet.
    '''
    time_in_periods = time_s/period_s
    amplitudes = 1.0*(time_in_periods - floor(time_in_periods) < fraction_on)
    return amplitudes

In [7]:
figure(figsize=(16, 4.5))
plot(time[0:200000], pulsar_amplitude(time[0:200000], 0.1, 0.1), linewidth=3)
xlabel('Time [s]')
ylabel('Amplitude [arbitrary]')
title('Pulsar amplitude with period 100 ms and duty-cycle of 10%')


Out[7]:
<matplotlib.text.Text at 0x4040dcd0>

In [8]:
psr_amp = pulsar_amplitude(time, period_s=0.1, fraction_on=0.1)
signal_x = white_noise_x*psr_amp
signal_y = white_noise_y*psr_amp

selection = 50000
figure(figsize=(16, 4.5))
plot(time[:selection], real(signal_x[:selection]), label='Real [x]', color='blue', linewidth=3, alpha=0.5)
plot(time[:selection], real(signal_y[:selection]), label='Real [y]', color='red', linewidth=3, alpha=0.5)
legend()
xlabel('Time [s]')
ylabel('Voltage [arbitrary]')
title('Simulated noiseless pulsar time series, one subband, no dispersion, critically sampled')


Out[8]:
<matplotlib.text.Text at 0x40391a90>

In [9]:
write_timeseries(signal_x, signal_y, 'synthetic_block_psr_100ms_no_noise.txt')

In [10]:
figure(figsize=(16,9))
dynspec_x = array([fft.fftshift(fft.fft(spec)) for spec in (signal_x[:64*int(floor(len(signal_x)/64.))].reshape((-1, 64)))[:800,:]])

imshow(abs(dynspec_x).T, origin='lower', interpolation='nearest')
colorbar()
axis('tight')
xlabel('time slot')
ylabel('channel')


Out[10]:
<matplotlib.text.Text at 0x41bfa10>

We now need a specific frequency spectrum. A FIR filter with a Gaussian frequency response would enable easy fitting for the exact frequency of the peak. The (un-normalized) functional form of the frequency response that we require is

\begin{equation} f(\nu) = \mathrm{e}^{-\frac{(\nu - \nu_\mathrm{peak})^2}{2\sigma_\nu^2}} \end{equation}

Given that LOFAR uses a Fourier transform with a minus sign in the exponent for the time-to-frequency transform, we have to use the transform with a plus sign in the exponent to transform this response to the time domain. The unnormalized result is

\begin{equation} w(t) = \mathrm{e}^{-2\pi^2t^2\sigma_\nu^2}\mathrm{e}^{+2\pi\mathrm{i}\nu_\mathrm{peak} t} \end{equation}

In [11]:
def raw_fir_coefficients(time_s, peak_hz, sigma_hz):
    r'''
    Compute
    
    math ::
        w(t) = \mathrm{e}^{-2\pi^2t^2\sigma_\nu^2}\mathrm{e}^{+2\pi\mathrm{i}\nu_\mathrm{peak} t}

    '''
    return exp(2.j*pi*peak_hz*time_s -2*(pi*time_s*sigma_hz)**2)

In [12]:
filter_length = 513
filter_time_s = (arange(filter_length) - floor(filter_length/2))*time_resolution_s
f_k = raw_fir_coefficients(filter_time_s, peak_hz=-14038.086, sigma_hz=12e3)
f_k /= f_k.sum()
fir_taper = kaiser(filter_length, beta=14.0)

spectral_signal_x = convolve(white_noise_x, f_k*fir_taper, mode='valid')
spectral_signal_x *= psr_amp[:len(spectral_signal_x)]
spectral_signal_y = convolve(white_noise_y, f_k*fir_taper, mode='valid')
spectral_signal_y *= psr_amp[:len(spectral_signal_y)]

In [52]:
figure(figsize=(16,9))
dynspec_x = array([fft.fftshift(fft.fft(spec*kaiser(len(spec), beta=14.))) for spec in (spectral_signal_x[:64*int(floor(len(spectral_signal_x)/64.))].reshape((-1, 64)))[:800,:]])
dynspec_y = array([fft.fftshift(fft.fft(spec*kaiser(len(spec), beta=14.))) for spec in (spectral_signal_y[:64*int(floor(len(spectral_signal_x)/64.))].reshape((-1, 64)))[:800,:]])

stokes_i = abs(dynspec_x)**2 + abs(dynspec_y)**2
stokes_i /= stokes_i.max()
imshow(log10(abs(stokes_i)).T, origin='lower', interpolation='nearest')
colorbar()
axis('tight')
xlabel('time slot')
ylabel('channel')


Out[52]:
<matplotlib.text.Text at 0x254c0ad0>

write_timeseries(spectral_signal_x, spectral_signal_y, 'synthetic_block_psr_100ms_gauss_spectrum_no_noise.txt')

We also need to test multiple sub bands.

Now, what do the data look like?


In [61]:
tf_data_gauss_spectrum = fromfile('tab0-gauss.raw', dtype=float32).reshape((-1, 64))
tf_data_gauss_spectrum /= tf_data_gauss_spectrum.max()

In [62]:
figure(figsize=(16, 9), dpi=100)
imshow(log10(tf_data_gauss_spectrum.T[:,:800]), interpolation='nearest', origin='lower')
axis('tight')
colorbar()


Out[62]:
<matplotlib.colorbar.Colorbar instance at 0x25a47d88>

In one plot:


In [86]:
dynspec_time_ms = time[32::64]*1000
extent = (dynspec_time_ms[0], dynspec_time_ms[799], -0.5, 63.5)
figure(figsize=(16, 9), dpi=100)
subplot(211)
imshow(log10(abs(stokes_i)).T, origin='lower', interpolation='nearest', vmax=0.0, vmin=-10, extent=extent)
cb_input = colorbar()
cb_input.set_label(r'$^{10}$log(I)')
axis('tight')
xlabel('Time [ms]')
ylabel('channel')
title('Input data')
grid()
subplot(212)
offset=7
imshow(log10(tf_data_gauss_spectrum.T[:,offset:800+offset]), interpolation='nearest', origin='lower', vmax=0.0, vmin=-10, extent=extent)
axis('tight')
cb_output = colorbar()
cb_output.set_label(r'$^{10}$log(I)')
xlabel('Time [ms]')
ylabel('channel')
title('Cobalt output')
grid()
savefig('../cobalt-commissioning-report/cobalt-bf-dynamic-spectrum-gauss-pulsar.pdf')



In [107]:
figure(figsize=(16, 4.5), dpi=200)

input_spectrum = stokes_i.sum(axis=0)
input_spectrum /= input_spectrum.max()
output_spectrum = tf_data_gauss_spectrum.sum(axis=0)
output_spectrum /= output_spectrum.max()

#plot(input_spectrum, color='blue', linewidth=3, label='Input')
plot(output_spectrum, color='blue', linewidth=3, label='Cobalt output')
xlabel('Channel')
ylabel('Amplitude')

title('Spectral response')
legend()
grid()
xticks(arange(64), ['%d' % chan for chan in arange(64)])
xlim(20,40)

peak_channel = 32-14038.086/(100e6/512./64.)
annotate('Expected peak', (peak_channel, 0.98), xytext=(peak_channel, 0.7), arrowprops={'linewidth':1}, rotation=90, horizontalalignment='center')
savefig('../cobalt-commissioning-report/cobalt-bf-spectrum-gauss-pulsar.pdf')



In [108]:
figure(figsize=(16, 4.5), dpi=200)

input_ts = stokes_i.sum(axis=1)
input_ts /= input_ts.max()
output_ts = tf_data_gauss_spectrum.sum(axis=1)
output_ts /= output_ts.max()

plot(dynspec_time_ms[:800], input_ts[:800], color='red', linewidth=3, label='Input')
plot(dynspec_time_ms[:800] -2.5, output_ts[:800], color='blue', linewidth=3, label='Cobalt output')
xlabel('Time [ms]')
ylabel('Amplitude')
xticks(arange(200), ['%d' % chan for chan in arange(200)])

title('Temporal response')
legend()
grid()
xlim(98,112)
yscale('log')
ylim(1e-3, 1.0)
savefig('../cobalt-commissioning-report/cobalt-bf-timeseries-gauss-pulsar.pdf')



In [ ]: